package com.dmp.report

import com.dmp.config.ConfigHandler
import com.dmp.util.{JPool, RptKpi}
import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import redis.clients.jedis.Jedis

object AppAnalysisCore {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
    sparkConf.setAppName("媒体分析-core实现方式")
    sparkConf.setMaster("local[*]")
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)

    // 读取
    val dataFrame = sqlContext.read.parquet(ConfigHandler.parquetFilePath)

    dataFrame.mapPartitions(iter=>{
      //获取一个连接
      val jedis= JPool.getJedis
      val result=iter.map(row=>{
        //维度字段
          var appName: String = row.getAs[String]("appname")
          val appId: String = row.getAs[String]("appid")
        if(StringUtils.isEmpty(appName)){
          if(StringUtils.isNotEmpty(appId)){
            jedis.hget("appdice",appId)
            if(StringUtils.isEmpty(appId)) appName="未知"
          }else{
            appName="未知"
          }
        }
        (appName,RptKpi.apply(row))
      })
      jedis.close()
      result
    })
      .reduceByKey((list1,list2)=>list1.zip(list2).map(tp=>tp._1+tp._2))
      .sortBy(tp=>tp._2.head,false)
      .saveAsTextFile("f:/Violet/report/app1")
  sc.stop()
  }
}
